Skip to content

feat: support micro batching alternative at source#3450

Open
vaibhavtiwari33 wants to merge 15 commits into
mainfrom
strm-mvtx
Open

feat: support micro batching alternative at source#3450
vaibhavtiwari33 wants to merge 15 commits into
mainfrom
strm-mvtx

Conversation

@vaibhavtiwari33
Copy link
Copy Markdown
Contributor

@vaibhavtiwari33 vaibhavtiwari33 commented Jun 2, 2026

  • Currently source reads 1 batch of messages at a time and waits for ack/nack of the whole batch of messages it read and sent downstream.
  • This is fine for pipeline which can read multiple batches (read ahead)
  • In case of monovertex, it essentially blocks reading of additional messages until the current batch gets fully acked/nacked (which can be delayed in cases of large tail latency).

What this PR does / why we need it

This PR aims to add an opt-in feature for monovertex to enable per message ack loop (instead of per batch acking)

  • Added a field in monovertex spec to enable streaming (off by default)
  • When streaming is enabled, acks for messages are tracked individually instead of in a batch
  • Max in-flight messages are still capped using concurrency
  • Semaphores (==concurrency) are distributed per message instead of per batch
  • Most of the new logic lives in streaming_source in source.rs where we're doing the same operations as before, but on a per message level.

Implemented spec:

kind: MonoVertex
metadata:
  name: my-mvtx
spec:
  streaming: true
...

Related issues

#3452

Testing

  • Unit tests and an e2e test was added
  • Tested with running a normal ud source with and without streaming enabled

Tracing Test:

Streaming disabled; transformer returns results
Screenshot 2026-06-03 at 5 01 03 PM

Streaming enabled: transformer returns results
Screenshot 2026-06-03 at 5 02 21 PM

Streaming disabled; transformer omits results
Screenshot 2026-06-03 at 5 08 31 PM

Streaming enabled; transformer omits results
Screenshot 2026-06-03 at 5 09 31 PM

Special notes for reviewers

This is a monovertex only feature currently since pipeline already supports reading multiple batches.

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
…. Implement watermark handling

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@codecov
Copy link
Copy Markdown

codecov Bot commented Jun 2, 2026

Codecov Report

❌ Patch coverage is 87.37728% with 90 lines in your changes missing coverage. Please review.
✅ Project coverage is 82.67%. Comparing base (38f3306) to head (639f079).

Files with missing lines Patch % Lines
rust/numaflow-core/src/source.rs 86.77% 82 Missing ⚠️
rust/numaflow-core/src/shared/create_components.rs 33.33% 6 Missing ⚠️
pkg/apis/numaflow/v1alpha1/mono_vertex_types.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3450      +/-   ##
==========================================
+ Coverage   82.55%   82.67%   +0.12%     
==========================================
  Files         307      307              
  Lines       77618    78169     +551     
==========================================
+ Hits        64077    64629     +552     
+ Misses      12984    12983       -1     
  Partials      557      557              

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vaibhavtiwari33 vaibhavtiwari33 changed the title feat: remove micro batching at source feat: support micro batching alternative at source Jun 2, 2026
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@vaibhavtiwari33 vaibhavtiwari33 marked this pull request as ready for review June 2, 2026 18:38
Comment thread rust/numaflow-core/src/source.rs Outdated
None => vec![msg_handle],
Some(transformer) => {
match transformer
.transform_batch(vec![msg_handle], cln_token.clone(), None)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you test tracing with this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tested end to end tracing with this change but all similar tracing changes have been made as the normal path, so should be good. I'll test this branch as well.

One more change that needs to be made in a separate PR is message level metrics (replacing batch level metrics for source)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I missed creating spans for transformer in this branch. Fixing this.

Copy link
Copy Markdown
Contributor Author

@vaibhavtiwari33 vaibhavtiwari33 Jun 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, ty for catching this!

vaibhavtiwari33 and others added 2 commits June 3, 2026 11:35
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
@vaibhavtiwari33 vaibhavtiwari33 requested a review from vigith June 4, 2026 01:58
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Copy link
Copy Markdown
Member

@vigith vigith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a WARN if streaming and readAhead are set simultaneously

"streaming=true supersedes read_ahead=true; read_ahead is ignored in streaming mode"
);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since currently streaming is a mvtx only feature, I'll move this inside the following block:

if self.streaming {
    self.streaming_source(
        pipeline_labels,
        mvtx_labels,
        bypass_router,
        messages_tx,
        cln_token,
    )
    .await
}

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@vaibhavtiwari33 vaibhavtiwari33 requested a review from syayi June 4, 2026 21:49
…d bypass router are Some

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants